package com.primeton.poctag.task;


import java.io.Closeable;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * <p>
 * </p>
 * Created by zhaopx on 2018/1/12 0012-18:39
 */
public class SparkJobOutputStream extends LogOutputStream implements Closeable {



    /**
     * 从 console log 中获取 appid 的 正则
     */
    final String pa = "Client: Application report for (application_\\d+_\\d+) \\(state: (ACCEPTED|RUNNING|FINISHED|FAILED|KILLED)\\)";


    /**
     * Yarn App
     */
    final Pattern YARN_APPID_PATTERN = Pattern.compile(pa);



    /**
     * 提交的应用 Yarn 程序ID
     */
    String appId;


    /**
     * 程序的状态
     */
    String state = "UNKNOWN";


    public SparkJobOutputStream(String baseDir, String jobId, String taskId) {
        super(baseDir, jobId, taskId);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        String line = new String(b, off, len, charset);
        super.write(line);
        Matcher matcher = YARN_APPID_PATTERN.matcher(line);
        if(matcher.find()){
            String appId = matcher.group(1);
            String state = matcher.group(2);
            setAppId(appId);
            setState(state);
            return;
        }
    }

    public String getAppId() {
        return appId;
    }

    public String getState() {
        return state;
    }

    public void setAppId(String appId) {
        this.appId = appId;
    }

    public void setState(String state) {
        this.state = state;
        //JobQueue.runningTask(getAppId(), jobId, taskId, state);
    }

}
